package queue

import (
	"errors"
	"gitee.com/zhucheer/orange/cfg"
	"sync"
)

type MqProducer interface {
	SendMsg(topic string, body string) (mqMsg MqMsg, err error)
	SendByteMsg(topic string, body []byte) (mqMsg MqMsg, err error)
}

type MqConsumer interface {
	ListenReceiveMsgDo(topic string, receiveDo func(mqMsg MqMsg)) (err error)
}

const (
	_ = iota
	SendMsg
	ReceiveMsg
)

type MqMsg struct {
	RunType int    `json:"run_type"`
	Topic   string `json:"topic"`
	MsgId   string `json:"msg_id"`
	Body    []byte `json:"body"`
}

var mqProducerInstanceMap map[string]MqProducer
var mqConsumerInstanceMap map[string]MqConsumer
var mutex sync.Mutex

func init() {
	mqProducerInstanceMap = make(map[string]MqProducer)
	mqConsumerInstanceMap = make(map[string]MqConsumer)
}

// NewProducer 新建一个生产者实例
func NewProducer(groupName string) (mqClient MqProducer, err error) {
	if item, ok := mqProducerInstanceMap[groupName]; ok {
		return item, nil
	}
	driver := cfg.GetString("queue.driver", "")
	endpoints := cfg.GetSliceString("queue.endpoints", []string{})
	if len(endpoints) == 0 {
		return mqClient, errors.New("endpoints is not found.")
	}
	retry := cfg.GetInt("queue.retry", 2)

	if groupName == "" {
		return mqClient, errors.New("mq groupName is empty.")
	}

	switch driver {
	case "rocketmq":
		mqClient = RegisterRocketProducerMust(endpoints, groupName, retry)
	case "redis":
		passwd := cfg.GetString("queue.passwd", "")
		dbnum := cfg.GetInt("queue.redisdb", 0)
		timeout := cfg.GetInt("queue.timeout", 3600)
		mqClient = RegisterRedisMqProducerMust(RedisOption{
			Addr:    endpoints[0],
			Passwd:  passwd,
			DBnum:   dbnum,
			Timeout: timeout,
		}, PoolOption{
			5, 50, 5,
		}, groupName, retry)

	default:
		panic("queue driver is not support")
	}

	mutex.Lock()
	defer mutex.Unlock()
	mqProducerInstanceMap[groupName] = mqClient

	return mqClient, nil
}

// NewConsumer 新建一个消费者实例
func NewConsumer(groupName string) (mqClient MqConsumer, err error) {
	if item, ok := mqConsumerInstanceMap[groupName]; ok {
		return item, nil
	}

	endpoints := cfg.GetSliceString("queue.endpoints", []string{})
	driver := cfg.GetString("queue.driver", "")
	if len(endpoints) == 0 {
		return mqClient, errors.New("endpoints is not found.")
	}

	if groupName == "" {
		return mqClient, errors.New("mq groupName is empty.")
	}

	switch driver {
	case "rocketmq":
		mqClient = RegisterRocketConsumerMust(endpoints, groupName)
	case "redis":
		passwd := cfg.GetString("queue.passwd", "")
		dbnum := cfg.GetInt("queue.redisdb", 0)
		timeout := cfg.GetInt("queue.timeout", 3600)
		mqClient = RegisterRedisMqConsumerMust(RedisOption{
			Addr:    endpoints[0],
			Passwd:  passwd,
			DBnum:   dbnum,
			Timeout: timeout,
		}, PoolOption{
			5, 50, 5,
		}, groupName)
	default:
		panic("queue driver is not support")

	}

	mutex.Lock()
	defer mutex.Unlock()
	mqConsumerInstanceMap[groupName] = mqClient

	return mqClient, nil
}

func (m *MqMsg) BodyString() string {
	return string(m.Body)
}
